#!/usr/bin/env python
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import errno
import fcntl
import getopt
import os
import re
import signal
import subprocess
import sys
import time
from socket import gethostname
from typing import TYPE_CHECKING, Any, Optional
from urllib.parse import urlencode
from urllib.request import urlopen

if TYPE_CHECKING:
    from collections.abc import Iterable, Iterator
    from types import FrameType
    from urllib.parse import _QueryType

    from _typeshed import StrOrBytesPath


# The pCache Version
pcacheversion = "4.2.3"

# Log message levels
DEBUG, INFO, WARN, ERROR = "DEBUG", "INFO ", "WARN ", "ERROR"

# filename for locking
LOCK_NAME = ".LOCK"

# Session ID
sessid = "%s.%s" % (int(time.time()), os.getpid())


# Run a command with a timeout
def run_cmd(args: "subprocess._CMD", timeout: int = 0) -> tuple[int, Optional[bytes]]:

    class Alarm(Exception):
        pass

    def alarm_handler(signum: int, frame: Optional["FrameType"]) -> None:
        raise Alarm

    # Execute the command as a subprocess
    try:
        p = subprocess.Popen(args=args,
                             shell=False,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)

    except:
        return (-2, None)

    # Set the timer if a timeout value was given
    if (timeout > 0):
        signal.signal(signal.SIGALRM, alarm_handler)
        signal.alarm(timeout)

    # Wait for the command to complete
    try:

        # Collect the output when the command completes
        stdout = p.communicate()[0][:-1]

        # Command completed in time, cancel the alarm
        if (timeout > 0):
            signal.alarm(0)

    # Command timed out
    except Alarm:

        # The pid of our spawn
        pids = [p.pid]

        # The pids of the spawn of our spawn
        pids.extend(get_process_children(p.pid))

        # Terminate all of the evil spawn
        for pid in pids:
            try:
                os.kill(pid, signal.SIGKILL)
            except OSError:
                pass

        # Return a timeout error
        return (-1, None)

    return (p.returncode, stdout)


def get_process_children(pid: int) -> list[int]:

    # Get a list of all pids associated with a given pid
    p = subprocess.Popen(args='ps --no-headers -o pid --ppid %d' % pid,
                         shell=True,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)

    # Wait and fetch the stdout
    stdout, stderr = p.communicate()

    # Return a list of pids as tuples
    return [int(pr) for pr in stdout.split()]


def unitize(x: int) -> str:

    suff = 'BKMGTPEZY'

    while ((x >= 1024) and suff):
        y = x / 1024.0
        suff = suff[1:]
    return "%.4g%s" % (y, suff[0])


class Pcache:

    def Usage(self) -> None:
        msg = """Usage: %s [flags] copy_prog [copy_flags] input output""" % self.progname
        sys.stderr.write("%s\n" % msg)  # py3, py2
    #  print>>sys.stderr, "  flags are: "
    #  "s:r:m:Cy:A:R:t:r:g:fFl:VvqpH:S:",
    #  "scratch-dir=",
    #  "storage-root=",
    #  "max-space=",
    #  "clean",
    #  "hysterisis=",
    #  "accept=",
    #  "reject=",
    #  "timeout=",
    #  "retry=",
    #  "force",
    #  "flush-cache",
    #  "guid=",
    #  "log=",
    #  "version",
    #  "verbose",
    #  "debug",
    #  "quiet",
    #  "panda",
    #  "hostname",
    #  "sitename"

    def __init__(self):
        os.umask(0)
        self.storage_root = "/pnfs"
        self.scratch_dir = "/scratch/"
        self.pcache_dir = self.scratch_dir + "pcache/"
        self.log_file = self.pcache_dir + "pcache.log"
        self.max_space = "80%"
        self.percent_max = None
        self.bytes_max = None
        # self.max_space = "10T"
        self.hysterisis = 0.75
        # self.hysterisis = 0.9
        self.clean = False
        self.transfer_timeout_as_str: str = "600"
        self.max_retries = 3
        self.guid = None
        self.accept_patterns: list[re.Pattern] = []
        self.reject_patterns: list[re.Pattern] = []
        self.force = False
        self.flush = False
        self.verbose = False
        self.quiet = False
        self.debug = False
        self.hostname = None
        self.sitename = None  # XXX can we get this from somewhere?
        self.update_panda = False
        self.panda_url = "https://pandaserver.cern.ch:25443/server/panda/"
        self.local_src = None
        self.skip_download = False

        # internal variables
        self.sleep_interval = 15
        self.force = False
        self.locks = {}
        self.deleted_guids = []
        self.version = pcacheversion

    def parse_args(self, args: list[str]) -> None:
        # handle pcache flags and leave the rest in self.args

        try:
            opts, args = getopt.getopt(args,
                                       "s:x:m:Cy:A:R:t:r:g:fFl:VvdqpPH:S:L:X:",
                                       ["scratch-dir=",
                                        "storage-root=",
                                        "max-space=",
                                        "clean",
                                        "hysterisis=",
                                        "accept=",
                                        "reject=",
                                        "timeout=",
                                        "retry=",
                                        "force",
                                        "flush-cache",
                                        "guid=",
                                        "log=",
                                        "version",
                                        "verbose",
                                        "debug",
                                        "quiet",
                                        "print-stats",
                                        "panda",
                                        "hostname",
                                        "sitename",
                                        "local-src"])

            # XXXX cache, stats, reset, clean, delete, inventory
            # TODO: move checksum/size validation from lsm to pcache
        except getopt.GetoptError as err:
            sys.stderr.write("%s\n" % str(err))
            self.Usage()
            self.fail(100)

        for opt, arg in opts:
            if opt in ("-s", "--scratch-dir"):
                self.scratch_dir = arg
                # Make sure scratch_dir endswith /
                if not self.scratch_dir.endswith("/"):
                    self.scratch_dir += "/"
                self.pcache_dir = self.scratch_dir + "pcache/"
                self.log_file = self.pcache_dir + "pcache.log"
            elif opt in ("-x", "--storage-root"):
                self.storage_root = arg
            elif opt in ("-m", "--max-space"):
                self.max_space = arg
            elif opt in ("-y", "--hysterisis"):
                if arg.endswith('%'):
                    self.hysterisis = float(arg[:-1]) / 100
                else:
                    self.hysterisis = float(arg)
            elif opt in ("-A", "--accept"):
                self.accept_patterns.append(re.compile(arg))
            elif opt in ("-R", "--reject"):
                self.reject_patterns.append(re.compile(arg))
            elif opt in ("-t", "--timeout"):
                self.transfer_timeout_as_str = arg
            elif opt in ("-f", "--force"):
                self.force = True
            elif opt in ("-F", "--flush-cache"):
                self.flush = True
            elif opt in ("-C", "--clean"):
                self.clean = True
            elif opt in ("-g", "--guid"):
                if arg == 'None':
                    self.guid = None
                else:
                    self.guid = arg
            elif opt in ("-r", "--retry"):
                self.max_retries = int(arg)
            elif opt in ("-V", "--version"):
                print(str(self.version))
                sys.exit(0)
            elif opt in ("-l", "--log"):
                self.log_file = arg
            elif opt in ("-v", "--verbose"):
                self.verbose = True
            elif opt in ("-d", "--debug"):
                self.debug = True
            elif opt in ("-q", "--quiet"):
                self.quiet = True
            elif opt in ("-p", "--print-stats"):
                self.print_stats()
                sys.exit(0)
            elif opt in ("-P", "--panda"):
                self.update_panda = True
            elif opt in ("-H", "--hostname"):
                self.hostname = arg
            elif opt in ("-S", "--sitename"):
                self.sitename = arg
            elif opt in ("-L", "--local-src"):
                self.local_src = str(arg)
            elif opt in ("-X", "--skip-download"):
                if str(arg) in ('True', 'true') or arg:
                    self.skip_download = True

        # Treatment of limits on pcache size
        self._convert_max_space()

        # Convert timeout to seconds
        mult = 1
        t = self.transfer_timeout_as_str
        suff = t[-1]
        if suff in ('H', 'h'):
            mult = 3600
            t = t[:-1]
        elif suff in ('M', 'm'):
            mult = 60
            t = t[:1]
        elif suff in ('S', 's'):
            mult = 1
            t = t[:-1]
        self.transfer_timeout: int = mult * int(t)

        # Set host and name
        if self.hostname is None:
            self.hostname = gethostname()
        if self.sitename is None:
            self.sitename = os.environ.get("SITE", "")  # XXXX

        # All done
        self.args = args

    def _convert_max_space(self) -> None:
        '''
        Added by Rucio team. Converts max allowed space usage of pcache into units used by this tool.
        :input self.max_space: limit set by user
        :output self.percent_max: max percentage of pcache space used
        :output self.bytes_max: max size in bytes of pcache space used
        '''

        # Convert max_space arg to percent_max or bytes_max
        if self.max_space.endswith('%'):
            self.percent_max = float(self.max_space[:-1])
            self.bytes_max = None
        else:  # handle suffix
            self.percent_max = None
            m = self.max_space.upper()
            index = "BKMGTPEZY".find(m[-1])
            if index >= 0:
                self.bytes_max = float(m[:-1]) * (1024**index)
            else:  # Numeric value w/o units (exception if invalid)
                self.bytes_max = float(self.max_space)

    def clean_pcache(self, max_space: Optional[str] = None) -> None:
        '''
        Added by Rucio team. Cleans pcache in case it is over limit.
        Used for tests of the pcache functionality. Can be called without other init.
        '''

        self.t0 = time.time()
        self.progname = "pcache"

        # set max. occupancy of pcache:
        if max_space:
            self.max_space = max_space
        self._convert_max_space()

        # Fail on extra args
        if not self.scratch_dir:
            self.Usage()
            self.fail(100)

        # hardcoded pcache dir
        self.pcache_dir = self.scratch_dir + '/pcache/'

        # clean pcache
        self.maybe_start_cleaner_thread()

    def check_and_link(
            self,
            src: str = '',
            dst: str = '',
            dst_prefix: str = '',
            scratch_dir: str = '/scratch/',
            storage_root: Optional[str] = None,
            force: bool = False,
            guid: Optional[str] = None,
            log_file: Optional[str] = None,
            version: str = '',
            hostname: Optional[str] = None,
            sitename: Optional[str] = None,
            local_src: Optional[str] = None
    ):
        '''
        Added by Rucio team. Replacement for the main method.
        Checks whether a file is in pcache:
         - if yes: creates a hardlink to the file in pcahce
         - if not:
              - returns 500 and leaves it to Rucio
              - Rucio downloads a file
        Makes hardlink in pcache to downloaded file:
         - needs :param local_src: path to downloaded file
        '''
        self.t0 = time.time()
        self.progname = "pcache"
        self.pcache_dir = scratch_dir + '/pcache/'
        self.src = src
        self.dst = dst
        self.dst_prefix = dst_prefix
        self.sitename = sitename
        self.hostname = hostname
        self.guid = guid
        if log_file:
            self.log_file = log_file
        self.local_src = local_src
        self.version = version
        self.storage_root = storage_root

        # Cache dir may have been wiped
        if ((not os.path.exists(self.pcache_dir)) and self.update_panda):
            self.panda_flush_cache()

        # Create the pCache directory
        if (self.mkdir_p(self.pcache_dir)):
            self.fail(101)

        self.log(INFO, "%s %s invoked as: API", self.progname, self.version)

        # Fail on extra args
        if not scratch_dir:
            self.Usage()
            self.fail(100)

        # If the source is lfn:, execute original command, no further action
        if (self.src.startswith('lfn:')):
            # status = os.execvp(self.copy_util, self.args)
            os._exit(1)

        # If the destination is a local file, do some rewrites
        if (self.dst.startswith('file:')):
            self.dst_prefix = 'file:'
            self.dst = self.dst[5:]
            # Leave one '/' on dst
            while ((len(self.dst) > 1) and (self.dst[1] == '/')):
                self.dst_prefix += '/'
                self.dst = self.dst[1:]

        # load file into pcache
        self.create_pcache_dst_dir()
        # XXXX TODO _ dst_dir can get deleted before lock!
        waited = False

        # If another transfer is active, lock_dir will block
        if (self.lock_dir(self.pcache_dst_dir, blocking=False)):
            waited = True
            self.log(INFO, "%s locked, waiting", self.pcache_dst_dir)
            self.lock_dir(self.pcache_dst_dir, blocking=True)

        if (waited):
            self.log(INFO, "waited %.2f secs", time.time() - self.t0)

        if force:
            self.empty_dir(self.pcache_dst_dir)

        # The name of the cached version of this file
        cache_file = self.pcache_dst_dir + "data"

        # Check if the file is in cache or we need to transfer it down
        if (os.path.exists(cache_file)):
            exit_status = 0
            copy_status = None
            self.log(INFO, "check_and_link: file found in cache")
            self.log(INFO, "cache hit %s", self.src)
            self.update_stats("cache_hits")
            self.finish()
            if (os.path.exists(self.dst)):
                copy_status = 1
        elif self.local_src:
            exit_status = 1
            copy_status = None
            self.log(INFO, "check_and_link: local replica found, linking to pcache")
            self.finish()
        else:
            self.log(INFO, "check_and_link: %s file not found in pcache and was not downloaded yet", self.src)
            return (500, None)

        self.unlock_dir(self.pcache_dst_dir)
        self.log(INFO, "total time %.2f secs", time.time() - self.t0)

        # in case that the pcache is over limit
        self.maybe_start_cleaner_thread()

        # Return if the file was cached, copied or an error (and its code)
        return (exit_status, copy_status)

    def main(self, args: list[str]) -> tuple[int, Optional[int]]:

        # args
        self.cmdline = ' '.join(args)
        self.t0 = time.time()
        self.progname = args[0] or "pcache"

        # Must have a list of arguments
        if (self.parse_args(args[1:])):
            self.Usage()
            self.fail(100)

        # Cache dir may have been wiped
        if ((not os.path.exists(self.pcache_dir)) and self.update_panda):
            self.panda_flush_cache()

        # Create the pCache directory
        if (self.mkdir_p(self.pcache_dir)):
            self.fail(101)

        self.log(INFO, "%s %s invoked as: %s", self.progname, self.version, self.cmdline)

        # Are we flushing the cache
        if (self.flush):
            if (self.args):
                sys.stderr.write("--flush not compatible with other options")
                self.fail(100)
            else:
                self.flush_cache()
                sys.exit(0)

        # Are we cleaning the cache
        if (self.clean):
            # size = self.do_cache_inventory()
            self.maybe_start_cleaner_thread()
            if (len(self.args) < 1):
                sys.exit(0)

        # Fail on extra args
        if (len(self.args) < 3):
            self.Usage()
            self.fail(100)

        self.copy_util = self.args[0]
        self.copy_args = self.args[1:-2]
        self.src = self.args[-2]
        self.dst = self.args[-1]
        self.dst_prefix = ''

        # If the source is lfn:, execute original command, no further action
        if (self.src.startswith('lfn:')):
            # status = os.execvp(self.copy_util, self.args)
            os._exit(1)

        # If the destination is a local file, do some rewrites
        if (self.dst.startswith('file:')):
            self.dst_prefix = 'file:'
            self.dst = self.dst[5:]
            # Leave one '/' on dst
            while ((len(self.dst) > 1) and (self.dst[1] == '/')):
                self.dst_prefix += '/'
                self.dst = self.dst[1:]

        # Execute original command, no further action
        if (not (self.dst.startswith(self.scratch_dir) and self.accept(self.src) and (not self.reject(self.src)))):
            os.execvp(self.copy_util, self.args)  # noqa: S606
            os._exit(1)

        # XXXX todo:  fast-path - try to acquire lock
        # first, if that succeeds, don't call
        # create_pcache_dst_dir

        # load file into pcache
        self.create_pcache_dst_dir()
        # XXXX TODO _ dst_dir can get deleted before lock!
        waited = False

        # If another transfer is active, lock_dir will block
        if (self.lock_dir(self.pcache_dst_dir, blocking=False)):
            waited = True
            self.log(INFO, "%s locked, waiting", self.pcache_dst_dir)
            self.lock_dir(self.pcache_dst_dir, blocking=True)

        if (waited):
            self.log(INFO, "waited %.2f secs", time.time() - self.t0)

        if (self.force):
            self.empty_dir(self.pcache_dst_dir)

        # The name of the cached version of this file
        cache_file = self.pcache_dst_dir + "data"

        # Check if the file is in cache or we need to transfer it down
        if (os.path.exists(cache_file)):
            exit_status = 1
            copy_status = None
            self.log(INFO, "cache hit %s", self.src)
            self.update_stats("cache_hits")
            self.finish()
        else:
            if self.skip_download:
                return (500, None)
            self.update_stats("cache_misses")
            exit_status, copy_status = self.pcache_copy_in()
            if ((exit_status == 0) or (exit_status == 2)):
                self.finish()

        self.unlock_dir(self.pcache_dst_dir)
        self.log(INFO, "total time %.2f secs", time.time() - self.t0)

        self.maybe_start_cleaner_thread()

        # Return if the file was cached, copied or an error (and its code)
        return (exit_status, copy_status)

    def finish(self, local_src: Optional[str] = None) -> None:
        cache_file = self.pcache_dst_dir + "data"
        self.update_mru()
        if self.local_src:
            if (self.make_hard_link(self.local_src, cache_file)):
                self.fail(102)
        else:
            if (self.make_hard_link(cache_file, self.dst)):
                self.fail(102)

    def pcache_copy_in(self) -> tuple[int, Optional[int]]:

        cache_file = self.pcache_dst_dir + "data"

        # Record source URL
        try:
            fname = self.pcache_dst_dir + "src"
            f = open(fname, 'w')
            f.write(self.src + '\n')
            f.close()
            self.chmod(fname, 0o666)
        except:
            pass

        # Record GUID if given
        if (self.guid):
            try:
                fname = self.pcache_dst_dir + "guid"
                f = open(fname, 'w')
                f.write(self.guid + '\n')
                f.close()
                self.chmod(fname, 0o666)
            except:
                pass

        # Try to transfer the file up the the number of retries allowed
        retry = 0
        while (retry <= self.max_retries):

            # Is this is a retry attempt, log it as such
            if (retry > 0):
                self.log(INFO, "do_transfer: retry %s", retry)

            # Do the transfer. exit_status will be either
            # 0 - success
            # 3 - Transfer command failed. copy_status has the return code
            # 4 - Transfer command timed out
            # 5 - Transfer command was not found
            exit_status, copy_status = self.do_transfer()

            # If success, stop trying, otherwise increment the retry count
            if (exit_status == 0):
                break
            retry += 1

        # Did the transfer succeed
        if (exit_status == 0):

            # If we succeeded on a retry, return status 2 and the retries
            if (retry == 0):
                copy_status = None
            else:
                exit_status = 2
                copy_status = retry

            # Update the cache information
            if self.local_src:
                self.update_cache_size(os.stat(self.local_src).st_size)
            else:
                self.update_cache_size(os.stat(cache_file).st_size)

            # Update the panda cache
            if (self.guid and self.update_panda):
                self.panda_add_cache_files((self.guid,))

        return (exit_status, copy_status)

    def create_pcache_dst_dir(self) -> None:

        d = self.src
        if self.storage_root is not None:
            index = d.find(self.storage_root)

            if (index >= 0):
                d = d[index:]
            else:
                index = d.find("SFN=")
                if (index >= 0):
                    d = d[index + 4:]

            # self.log(INFO, '%s', self.storage_root)
            # self.log(INFO, '%s', d)
            # XXXX any more patterns to look for?
            d = os.path.normpath(self.pcache_dir + "CACHE/" + d)
            if (not d.endswith('/')):
                d += '/'

            self.pcache_dst_dir = d
            status = self.mkdir_p(d)
            if (status):
                self.log(ERROR, "mkdir %s %s", d, status)
                self.fail(103)

    def get_disk_usage(self) -> int:
        p = os.popen("df -P %s | tail -1" % self.pcache_dir, 'r')  # noqa: S605
        data = p.read()
        status = p.close()
        if status:
            self.log(ERROR, "get_disk_usage: df command failed, status=%s", status)
            sys.exit(1)
        tok = data.split()
        percent = tok[-2]
        if not percent.endswith('%'):
            self.log(ERROR, "get_disk_usage: cannot parse df output: %s", data)
            sys.exit(1)
        percent = int(percent[:-1])
        return percent

    def over_limit(self, factor: float = 1.0) -> bool:
        if self.percent_max:
            return self.get_disk_usage() > factor * self.percent_max
        if self.bytes_max:
            cache_size = self.get_cache_size()
            if cache_size is not None:
                return cache_size > factor * self.bytes_max
        return False

    def clean_cache(self) -> None:
        t0 = time.time()
        cache_size = self.get_cache_size()

        if cache_size is not None:
            self.log(INFO, "starting cleanup, cache size=%s, usage=%s%%",
                     unitize(cache_size),
                     self.get_disk_usage())

        for link in self.list_by_mru():
            try:
                d = os.readlink(link)

            except OSError as e:
                self.log(ERROR, "readlink: %s", e)
                continue

            self.log(DEBUG, "deleting %s", d)

            if os.path.exists(d):
                self.empty_dir(d)
            else:
                self.log(WARN, "Attempt to delete missing file %s", d)
                self.flush_cache()
                break

            # empty_dir should also delete MRU symlink, but
            # mop up here in there is some problem with the
            # backlink

            try:
                os.unlink(link)

            except OSError as e:
                if e.errno != errno.ENOENT:
                    self.log(ERROR, "unlink: %s", e)

            if not self.over_limit(self.hysterisis):
                break

        self.log(INFO, "cleanup complete, cache size=%s, usage=%s%%, time=%.2f secs",
                 self.get_cache_size(),
                 self.get_disk_usage(),
                 time.time() - t0)

    def list_by_mru(self) -> "Iterator[str]":
        mru_dir = self.pcache_dir + "MRU/"
        for root, dirs, files in os.walk(mru_dir):
            dirs.sort()
            for d in dirs:
                path = os.path.join(root, d)
                if os.path.islink(path):
                    dirs.remove(d)
                    yield path
            if files:
                files.sort()
                for file in files:
                    path = os.path.join(root, file)
                    yield path

    def flush_cache(self) -> None:
        # Delete everything in CACHE, MRU, and reset stats
        self.log(INFO, "flushing cache")
        if self.update_panda:
            self.panda_flush_cache()
        self.reset_stats()
        ts = '.' + str(time.time())
        for d in "CACHE", "MRU":
            d = self.pcache_dir + d
            try:
                os.rename(d, d + ts)
                os.system("rm -rf %s &" % (d + ts))  # noqa: S605
            except OSError as e:
                if e.errno != errno.ENOENT:
                    self.log(ERROR, "%s: %s", d, e)

    def do_transfer(self) -> tuple[int, Optional[int]]:

        # Cache file and transfer file locations
        cache_file = self.pcache_dst_dir + "data"
        xfer_file = self.pcache_dst_dir + "xfer"

        # Remove any transfer file with the same name
        try:
            os.unlink(xfer_file)
        except OSError as e:
            if e.errno != errno.ENOENT:
                self.log(ERROR, "unlink: %s", e)

        # Build the copy command with the destination into the xfer location
        args = self.args[:]
        args[-1] = self.dst_prefix + xfer_file

        # Save the current time for timing output
        t0 = time.time()

        # Do the copy with a timeout
        if self.local_src:
            return (0, None)
        else:
            copy_status, copy_output = run_cmd(args, self.transfer_timeout)

        # Did the command timeout
        if (copy_status == -1):
            self.log(ERROR, "copy command timed out, elapsed time = %.2f sec", time.time() - t0)
            self.cleanup_failed_transfer()
            return (4, None)
        elif (copy_status == -2):
            self.log(ERROR, "copy command was not found")
            self.cleanup_failed_transfer()
            return (5, None)

        # Display any output from the copy
        if (copy_output):
            print('%s' % copy_output)

        # Did the copy succeed (good status and an existing file)
        if ((copy_status > 0) or (not os.path.exists(xfer_file))):
            self.log(ERROR, "copy command failed, elapsed time = %.2f sec", time.time() - t0)
            self.cleanup_failed_transfer()
            return (3, copy_status)

        self.log(INFO, "copy command succeeded, elapsed time = %.2f sec", time.time() - t0)

        try:
            os.rename(xfer_file, cache_file)
            # self.log(INFO, "rename %s %s", xfer_file, cache_file)
        except OSError:  # Fatal error if we can't do this
            self.log(ERROR, "rename %s %s", xfer_file, cache_file)
            try:
                os.unlink(xfer_file)
            except:
                pass
            self.fail(104)

        # Make the file readable to all
        self.chmod(cache_file, 0o666)

        # Transfer completed, return the transfer command status
        return (0, None)

    def maybe_start_cleaner_thread(self) -> None:
        if not self.over_limit():
            return
        # exit immediately if another cleaner is active
        cleaner_lock = os.path.join(self.pcache_dir, ".clean")
        if self.lock_file(cleaner_lock, blocking=False):
            self.log(INFO, "cleanup not starting:  %s locked", cleaner_lock)
            return
        # see http://www.faqs.org/faqs/unix-faq/faq/part3/section-13.html
        # for explanation of double-fork
        pid = os.fork()
        if pid:  # parent
            os.waitpid(pid, 0)
            return
        else:  # child
            self.daemonize()
            pid = os.fork()
            if pid:
                os._exit(0)
            # grandchild
            self.clean_cache()
            self.unlock_file(cleaner_lock)
            os._exit(0)

    def make_hard_link(self, src: "StrOrBytesPath", dst: "StrOrBytesPath") -> Optional[int]:
        self.log(INFO, "linking %s to %s", src, dst)
        try:
            if os.path.exists(dst):
                os.unlink(dst)
            os.link(src, dst)
        except OSError as e:
            self.log(ERROR, "make_hard_link: %s", e)
            ret = e.errno
            if ret == errno.ENOENT:
                try:
                    stat_info = os.stat(src)
                    self.log(INFO, "stat(%s) = %s", src, stat_info)
                except:
                    self.log(INFO, "cannot stat %s", src)
                try:
                    stat_info = os.stat(dst)
                    self.log(INFO, "stat(%s) = %s", dst, stat_info)
                except:
                    self.log(INFO, "cannot stat %s", dst)
            return ret

    def reject(self, name: str) -> bool:
        for pat in self.reject_patterns:
            if pat.search(name):
                return True
        return False

    def accept(self, name: str) -> bool:
        if not self.accept_patterns:
            return True
        for pat in self.accept_patterns:
            if pat.search(name):
                return True
        return False

    def get_stat(self, stats_dir: str, stat_name: str) -> int:
        filename = os.path.join(self.pcache_dir, stats_dir, stat_name)
        try:
            f = open(filename, 'r')
            data = int(f.read().strip())
            f.close()
        except:
            data = 0
        return data

    def print_stats(self) -> None:
        print(("Cache size: %s", unitize(self.get_stat("CACHE", "size"))))
        print(("Cache hits: %s", self.get_stat("stats", "cache_hits")))
        print(("Cache misses: %s", self.get_stat("stats", "cache_misses")))

    def reset_stats(self) -> None:
        stats_dir = os.path.join(self.pcache_dir, "stats")
        try:
            for f in os.listdir(stats_dir):
                try:
                    os.unlink(os.path.join(stats_dir, f))
                except:
                    pass
        except:
            pass
        # XXXX error handling
        pass

    def update_stat_file(self, stats_dir: str, name: str, delta: int) -> None:  # internal
        stats_dir = os.path.join(self.pcache_dir, stats_dir)
        self.mkdir_p(stats_dir)
        self.lock_dir(stats_dir)
        stats_file = os.path.join(stats_dir, name)
        try:
            f = open(stats_file, 'r')
            data = f.read()
            f.close()
            value = int(data)
        except:
            # XXXX
            value = 0
        value += delta
        try:
            f = open(stats_file, 'w')
            f.write("%s\n" % value)
            f.close()
            self.chmod(stats_file, 0o666)
        except:
            pass
            # XXX
        self.unlock_dir(stats_dir)

    def update_stats(self, name: str, delta: int = 1) -> None:
        return self.update_stat_file("stats", name, delta)

    def update_cache_size(self, bytes_: int) -> None:
        return self.update_stat_file("CACHE", "size", bytes_)

    def get_cache_size(self) -> Optional[int]:
        filename = os.path.join(self.pcache_dir, "CACHE", "size")
        size = 0

        try:
            f = open(filename)
            data = f.read()
            size = int(data)
        except:
            pass

        # If we could not fetch the size, do a reinventory
        if size == 0:
            size = self.do_cache_inventory()

        # The size should never be negative, so lets cleanup and start over
        if size is not None and size < 0:
            self.log(WARN, "CACHE corruption found. Negative CACHE size: %d", size)
            self.flush_cache()
            size = 0

        return size

    def do_cache_inventory(self) -> Optional[int]:

        inventory_lock = os.path.join(self.pcache_dir, ".inventory")
        if self.lock_file(inventory_lock, blocking=False):
            return

        size = 0

        self.log(INFO, "starting inventory")

        for root, dirs, files in os.walk(self.pcache_dir):
            for f in files:
                if f == "data":
                    fullname = os.path.join(root, f)
                    try:
                        size += os.stat(fullname).st_size
                    except OSError as e:
                        self.log(ERROR, "stat(%s): %s", fullname, e)

        filename = os.path.join(self.pcache_dir, "CACHE", "size")

        try:
            f = open(filename, 'w')
            f.write("%s\n" % size)
            f.close()
            self.chmod(filename, 0o666)
        except:
            pass  # XXXX

        self.unlock_file(inventory_lock)
        self.log(INFO, "inventory complete, cache size %s", size)
        return size

    def daemonize(self) -> None:
        if self.debug:
            return
        try:
            os.setsid()
        except OSError:
            pass
        try:
            os.chdir("/")
        except OSError:
            pass
        try:
            os.umask(0)
        except OSError:
            pass
        n = os.open("/dev/null", os.O_RDWR)
        i, o, e = sys.stdin.fileno(), sys.stdout.fileno(), sys.stderr.fileno()
        os.dup2(n, i)
        os.dup2(n, o)
        os.dup2(n, e)
        MAXFD = 1024
        try:
            import resource  # Resource usage information.
            maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
            if (maxfd == resource.RLIM_INFINITY):
                maxfd = MAXFD
        except:
            try:
                maxfd = os.sysconf("SC_OPEN_MAX")
            except:
                maxfd = MAXFD  # use default value

        for fd in range(0, maxfd + 1):
            try:
                os.close(fd)
            except:
                pass

    # Panda server callback functions
    def do_http_post(self, url: str, data: "_QueryType") -> None:
        # see http://www.faqs.org/faqs/unix-faq/faq/part3/section-13.html
        # for explanation of double-fork (is it overkill here?)
        pid = os.fork()
        if pid:  # parent
            os.waitpid(pid, 0)
            return
        else:  # child
            self.daemonize()
            pid = os.fork()
            if pid:
                os._exit(0)
            # grandchild
            retry = 0
            # This will retry for up to 1 hour, at 2 minute intervals
            while retry < 30:
                try:
                    u = urlopen(url, data=urlencode(data))  # type: ignore
                    ret = u.read()
                    u.close()
                    self.log(INFO, "http post to %s, retry %s, data='%s', return='%s'",
                             url, retry, data, ret)
                    if ret == "True":
                        break
                except:
                    exc, msg, tb = sys.exc_info()
                    self.log(ERROR, "post to %s, data=%s, error=%s", url, data, msg)
                retry += 1
                time.sleep(120)
            # finished, don't keep the child thread around!
            os._exit(0)

    def panda_flush_cache(self) -> None:
        self.do_http_post(self.panda_url + "flushCacheDB",
                          data={"site": self.sitename,
                                "node": self.hostname})

    def panda_add_cache_files(self, guids: "Iterable[str]") -> None:
        self.do_http_post(self.panda_url + "addFilesToCacheDB",
                          data={"site": self.sitename,
                                "node": self.hostname,
                                "guids": ','.join(guids)})

    def panda_del_cache_files(self, guids: "Iterable[str]") -> None:
        self.do_http_post(self.panda_url + "deleteFilesFromCacheDB",
                          data={"site": self.sitename,
                                "node": self.hostname,
                                "guids": ','.join(guids)})

    # Locking functions
    def lock_dir(self, d: str, create: bool = True, blocking: bool = True) -> Optional[int]:
        lock_name = os.path.join(d, LOCK_NAME)
        lock_status = self.lock_file(lock_name, blocking)
        if (not lock_status):  # succeeded
            return
        if ((lock_status == errno.ENOENT) and create):
            mkdir_status = self.mkdir_p(d)
            if (mkdir_status):
                self.log(ERROR, "mkdir %s %s", d, mkdir_status)
                self.fail(105)
            lock_status = self.lock_file(lock_name, blocking)
        return lock_status

    def unlock_dir(self, d: str) -> Optional[Any]:
        return self.unlock_file(os.path.join(d, LOCK_NAME))

    def lock_file(self, name: str, blocking: bool = True) -> Optional[int]:
        if name in self.locks:
            self.log(DEBUG, "lock_file: %s already locked", name)
            return
        try:
            f = open(name, 'w')
        except OSError as e:
            self.log(ERROR, "open: %s", e)
            return e.errno

        self.locks[name] = f
        flag = fcntl.LOCK_EX
        if not blocking:
            flag |= fcntl.LOCK_NB
        while True:
            try:
                status = fcntl.lockf(f, flag)
                break
            except OSError as e:
                if e.errno in (errno.EAGAIN, errno.EACCES) and not blocking:
                    f.close()
                    del self.locks[name]
                    return e.errno
                if e.errno != errno.EINTR:
                    status = e.errno
                    self.log(ERROR, "lockf: %s", e)
                    self.fail(106)
        return status

    def unlock_file(self, name: str) -> Optional[Any]:
        f = self.locks.get(name)
        if not f:
            self.log(DEBUG, "unlock_file: %s not locked", name)
            return

        # XXXX does this create a possible race condition?
        if 0:
            try:
                os.unlink(name)
            except:
                pass
        status = fcntl.lockf(f, fcntl.LOCK_UN)
        f.close()
        del self.locks[name]
        return status

    def unlock_all(self) -> None:
        for filename, f in list(self.locks.items()):
            try:
                f.close()
                os.unlink(filename)
            except:
                pass

    # Cleanup functions
    def delete_file_and_parents(self, name: str) -> None:
        try:
            os.unlink(name)
        except OSError as e:
            if e.errno != errno.ENOENT:
                self.log(ERROR, "unlink: %s", e)
                self.fail(107)
        self.delete_parents_recursive(name)

    def delete_parents_recursive(self, name: str) -> None:  # internal
        try:
            dirname = os.path.dirname(name)
            if not os.listdir(dirname):
                os.rmdir(dirname)
                self.delete_parents_recursive(dirname)
        except OSError as e:
            self.log(DEBUG, "delete_parents_recursive: %s", e)

    def update_mru(self) -> None:
        now = time.time()
        link_to_mru = self.pcache_dst_dir + "mru"
        if os.path.exists(link_to_mru):
            link = os.readlink(link_to_mru)
            self.delete_file_and_parents(link)

        try:
            os.unlink(link_to_mru)
        except OSError as e:
            if e.errno != errno.ENOENT:
                self.log(ERROR, "unlink: %s", e)
                self.fail(108)

        mru_dir = self.pcache_dir + "MRU/" + time.strftime("%Y/%m/%d/%H/%M/",
                                                           time.localtime(now))

        self.mkdir_p(mru_dir)

        # getting symlink
        name = "%.3f" % (now % 60)
        ext = 0
        while True:
            if ext:
                link_from_mru = "%s%s-%s" % (mru_dir, name, ext)
            else:
                link_from_mru = "%s%s" % (mru_dir, name)

            try:
                os.symlink(self.pcache_dst_dir, link_from_mru)
                break
            except OSError as e:
                if e.errno == errno.EEXIST:
                    ext += 1  # add an extension & retry if file exists
                    continue
                else:
                    self.log(ERROR, "symlink: %s %s", e, link_from_mru)
                    self.fail(109)

        while True:
            try:
                os.symlink(link_from_mru, link_to_mru)
                break
            except OSError as e:
                if e.errno == errno.EEXIST:
                    try:
                        os.unlink(link_to_mru)
                    except OSError as e:
                        if e.errno != errno.ENOENT:
                            self.log(ERROR, "unlink: %s %s", e, link_to_mru)
                            self.fail(109)
                else:
                    self.log(ERROR, "symlink: %s %s", e, link_from_mru)
                    self.fail(109)

    def cleanup_failed_transfer(self) -> None:
        try:
            os.unlink(self.pcache_dir + 'xfer')
        except:
            pass

    def empty_dir(self, d: str) -> None:
        status = None
        bytes_deleted = 0
        for name in os.listdir(d):
            size = 0
            fullname = os.path.join(d, name)
            if name == "data":
                try:
                    size = os.stat(fullname).st_size
                except OSError as e:
                    if e.errno != errno.ENOENT:
                        self.log(WARN, "stat: %s", e)
            elif name == "guid":
                try:
                    guid = open(fullname).read().strip()
                    self.deleted_guids.append(guid)
                except:
                    pass  # XXXX
            elif name == "mru" and os.path.islink(fullname):
                try:
                    mru_file = os.readlink(fullname)
                    os.unlink(fullname)
                    self.delete_file_and_parents(mru_file)
                except OSError as e:
                    if e.errno != errno.ENOENT:
                        self.log(WARN, "empty_dir: %s", e)
            try:
                if self.debug:
                    print(("UNLINK %s", fullname))
                os.unlink(fullname)
                bytes_deleted += size
            except OSError as e:
                if e.errno != errno.ENOENT:
                    self.log(WARN, "empty_dir2: %s", e)
                # self.fail()
        self.update_cache_size(-bytes_deleted)
        self.delete_parents_recursive(d)
        return status

    def chmod(self, path: str, mode: int) -> None:
        try:
            os.chmod(path, mode)
        except OSError as e:
            if e.errno != errno.EPERM:  # Cannot chmod files we don't own!
                self.log(ERROR, "chmod %s %s", path, e)

    def mkdir_p(self, d: str, mode: int = 0o777) -> Optional[int]:
        # Thread-safe
        try:
            os.makedirs(d, mode)
            return 0
        except OSError as e:
            if e.errno == errno.EEXIST:
                pass
            else:
                # Don't use log here, log dir may not exist
                sys.stderr.write("%s\n" % str(e))
                return e.errno

    def log(self, level: str, msg: str, *args) -> None:

        # Disable all logging
        if (self.quiet):
            return

        if ((level == DEBUG) and (not self.debug)):
            return

        msg = "%s %s %s %s %s\n" % (time.strftime("%F %H:%M:%S"),
                                    sessid,
                                    self.hostname,
                                    level,
                                    str(msg) % args)

        try:
            f = open(self.log_file, "a+", 0o666)
            f.write(msg)
            f.close()

        except Exception as e:
            sys.stderr.write("%s\n" % str(e))
            sys.stderr.write(msg)
            sys.stderr.flush()

        if (self.debug or self.verbose or (level == ERROR)):
            sys.stderr.write(msg)
            sys.stderr.flush()

    def fail(self, errcode: int = 1) -> None:
        self.unlock_all()
        sys.exit(errcode)

##################################################################################

# pCache exit_status will be
#
#    0 - File was transferred into cache and is ready
#    1 - File is cached and ready to use
#    2 - File was transferred but with a retry (copy_status has the retry count)
#    3 - Transfer command failed (copy_status has the transfer return code)
#    4 - Transfer command timed out
#
#  100 - Usage error
#  101 - Cache directory does not exist
#  102 - Cache hard link error
#  103 - Cache destination mkdir error
#  104 - Cache rename error
#  105 - Cache locking error
#  106 - Cache file locking error
#  107 - Cache cleanup error
#  108 - Cache MRU update error
#  109 - Cache MRU link error
#  500 - Is file in pcache? No other action


if (__name__ == "__main__"):

    # Load pCache
    p = Pcache()

    # Save the passed arguments
    args = sys.argv

    # Find the file
    exit_status, copy_status = p.main(args)

    # Take us home percy...
    sys.exit(exit_status)
